import com.google.common.collect.Maps;
import com.tplhk.thread.completeableFuture.service.CarInfo;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * @ClassName :
 * @Description : 另一个经典的例子
 * @Author : jqxu
 * @Date: 2022/4/27 9:11
 **/
@Slf4j
@SpringBootTest(classes = {FutureLesson3.class})
public class FutureLesson3 {

    ExecutorService taskExecutor = Executors.newFixedThreadPool(10);

    /**
     * complex 的使用
     * 1. 在线程中取得cars列表
     * 2. 遍历cars ,每个car  在独立线程中调用 rating(carName) 得到一个评分
     * 3. 所有 cars 评分得到后，打印car信息和评分
     *
     * @throws InterruptedException
     * @throws TimeoutException
     * @throws ExecutionException
     */
    @Test
    public void complex() throws InterruptedException, TimeoutException, ExecutionException {
        long start = System.currentTimeMillis();

        log.info("{} ", Thread.currentThread().getName());

        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> getCarInfo(), taskExecutor).thenApplyAsync((v) -> {
            Map<Integer, String> colorMapper = colorMapper(v);
            CarWapper carWapper = new CarWapper();
            carWapper.setCarInfoList(v);
            carWapper.setColorMapper(colorMapper);
            return carWapper;
        }).thenApplyAsync((v) -> {
            log.info("处理具体业务线程[{}]", Thread.currentThread().getName());
            int step = 3;
            int carInfoListSize = v.getCarInfoList().size();
            List<CarInfo> ret = Lists.newArrayList();
            for (int i = 0; i < carInfoListSize; i = i * step) {
                log.info("begin : {} ====================================================", i);
                List<CarInfo> list = v.getCarInfoList().subList(i, i + step);
                ret.addAll(threadHandle(list, v.getColorMapper()).join());
                log.info("end : {} ====================================================", i);
                i++;
            }
            return ret;
        }).thenAcceptAsync((v) -> {
            log.info("保存结果线程[{}]", Thread.currentThread().getName());
            log.info("保存:{}", v.toString());
        });

        for(;;){
            if(voidCompletableFuture.isDone()){
                break;
            }
            Thread.sleep(1 * 1000);
        }
    }


    public List<CarInfo> getCarInfo() {
        log.info("初始化要处理的数据[{}] ", Thread.currentThread().getName());
        List<CarInfo> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            CarInfo carInfo = new CarInfo();
            carInfo.setId(i);
            carInfo.setCarName("car" + i);
            list.add(carInfo);
        }
        return list;
    }

    private Map<Integer, String> colorMapper(List<CarInfo> list){
        log.info("创建映射关系[{}]，colorMapper.list=={}", Thread.currentThread().getName(), list.toString());
        Map<Integer, String> colorHashMap = Maps.newHashMap();
        colorHashMap.put(0,"red");
        colorHashMap.put(1,"green");
        colorHashMap.put(2,"yellow");
        colorHashMap.put(3,"browse");
        colorHashMap.put(4,"white");
        colorHashMap.put(5,"bule");
        return colorHashMap;
    }

    private CompletableFuture<List<CarInfo>> threadHandle(List<CarInfo> list, Map<Integer, String> colorMapper) {
        log.info("我要在线程中处理多条业务数据[{}]: {} ", Thread.currentThread().getName(), list.toString());
        ExecutorService taskExecutor = Executors.newFixedThreadPool(10);
        return CompletableFuture.completedFuture(list).thenComposeAsync((v) -> {
            log.info("我要在(子)线程中处理多条业务数据[{}] ", Thread.currentThread().getName());
            List<CompletableFuture<CarInfo>> completableFutureList = v.stream()
                    .map(car -> doCarInfo(car, colorMapper))
                    .collect(Collectors.toList());

            return CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]))
                    .thenApply(all -> completableFutureList.stream()
                            .map(item -> item.handle((carinfo, e) -> {
                                if (null != e) {
                                    log.info("{} error!!!", carinfo.getCarName());
                                    return null;
                                }
                                return carinfo;
                            }))
                            .map(item -> item.join())
                            .filter(Objects::nonNull).collect(Collectors.toList()));
        }, taskExecutor).thenApplyAsync(Function.identity());
    }

    private CompletableFuture<CarInfo> doCarInfo(CarInfo carInfo, Map<Integer, String> colorMapper){
        return CompletableFuture.completedFuture(carInfo).thenApplyAsync((v) -> {
            log.info("我要在(孙)线程中处理每一条业务数据[{}] : {}", Thread.currentThread().getName(), carInfo.toString());
            try {
                Thread.sleep( 5*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            v.setColorMapper(colorMapper.getOrDefault(v.getId(), "black"));
            return v;
        });
    }

    @Data
    class CarWapper{
        List<CarInfo> carInfoList;
        Map<Integer, String> colorMapper;
    }

}
